import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

public class KafkaFlinkDDL {



    public static void main(String[] args) throws Exception {

        //创建flink运行环境
        StreamExecutionEnvironment Env = StreamExecutionEnvironment.getExecutionEnvironment();

        //创建tableEnvironment
        StreamTableEnvironment TableEnv = StreamTableEnvironment.create(Env);

        TableEnv.sqlUpdate("CREATE TABLE user_log1 (\n" +
                "    user_id VARCHAR,\n" +
                "    item_id VARCHAR,\n" +
                "    category_id VARCHAR,\n" +
                "    behavior VARCHAR,\n" +
                "    ts VARCHAR\n" +
                ") WITH (\n" +
                "    'connector.type' = 'kafka',\n" +
                "    'connector.version' = 'universal',\n" +
                "    'connector.topic' = 'my_topic',\n" +
                "    'connector.startup-mode' = 'earliest-offset',\n" + //optional: valid modes are "earliest-offset","latest-offset", "group-offsets",or "specific-offsets"
                "    'connector.properties.group.id' = 'testGroup',\n" +
                "    'connector.properties.zookeeper.connect' = 'Desktop:2181,Desktop:2182,Desktop:2183',\n" +
                "    'connector.properties.bootstrap.servers' = 'Desktop:9091',\n" +
                "    'format.type' = 'json'\n" +
                ")"
        );

        Table result = TableEnv.sqlQuery("select item_id,count(*) from user_log1 group by item_id");
        //TableEnv.toAppendStream(result, Types.TUPLE(Types.INT,Types.LONG)).print();

        TableEnv.toRetractStream(result, Types.TUPLE(Types.STRING, Types.LONG)).print();

        Env.execute("flink job");


    }
    }
